package gl.java.mq;

import gl.java.umsp.*;
import gl.java.util.JsonUtil;
import io.netty.channel.*;
import org.json.JSONArray;
import org.json.JSONException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;

@ChannelHandler.Sharable
public class Consumer extends SimpleChannelInboundHandler<UmspHeader> {
    private final Receiver mReceiver;

    public interface Receiver {
        void onMessage(String topic ,String msg);
    }

    public void channelActive(final ChannelHandlerContext ctx) throws InterruptedException {
        JSONArray object = null;
        try {
            object = new JSONArray(this.mTopic);
        } catch (JSONException e) {
            e.printStackTrace();
        }
        log.info("[Consumer] connect : "+ Arrays.toString(this.mTopic));
        NettyMsgHelper.buildSubMsg(ctx, object);
    }
    public void channelInActive(final ChannelHandlerContext ctx) throws InterruptedException {
        log.info("[Consumer] disconnect: "+ Arrays.toString(this.mTopic));
    }


    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, UmspHeader umspHeader) throws Exception {
        try {
            if (mReceiver != null) {
                Packet packet= JsonUtil.fromJson(new String(umspHeader.payload), Packet.class);
                this.mReceiver.onMessage(packet.topic,packet.msg);
            }
        } catch (Exception e) {
            e.printStackTrace();
            log.warn(e.getMessage());
        }
    }

    private final String[] mTopic;
    private final String mHost;
    private final int mPort;
    private static Logger log = LoggerFactory.getLogger(Consumer.class);

    public Consumer(String host, int port, Consumer.Receiver receiver, String... topic) {
        this.mHost = host;
        this.mPort = port;
        this.mTopic = topic;
        this.mReceiver = receiver;
    }

    public void start() {
        new NettyClient().start(mHost, mPort, Consumer.this);
    }


}
